event
Message System
Lihil has built-in support for both in-process message handling (Beta) and out-of-process message handling (implementing), it is recommended to use EventBus
over BackGroundTask
for event handling.
There are three primitives for event:
- publish: asynchronous and blocking event handling that shares the same scope with caller.
- emit: non-blocking asynchrounous event hanlding, has its own scope.
- sink: a thin wrapper around external dependency for data persistence, such as message queue or database.
from lihil import Route, status, Empty
from lihil.plugins.bus import Event, PEventBus
from lihil.plugins.testclient import LocalClient
class TodoCreated(Event):
name: str
content: str
async def listen_create(created: TodoCreated, ctx):
assert created.name
assert created.content
async def listen_twice(created: TodoCreated, ctx):
assert created.name
assert created.content
registry = MessageRegistry(event_base=Event)
registry.register(listen_create, listen_twice)
bus_plugin = BusPlugin(BusTerminal(registry))
bus_route = Route("/bus", plugins=[bus_plugin])
@bus_route.post
async def create_todo(name: str, content: str, bus: PEventBus) -> Annotated[Empty, status.OK]:
await bus.publish(TodoCreated(name, content))
PEventBus
is an alias for Annotated[EventBus[Any], Param("plugin")]
Event handlers
An event can have multiple event handlers, they will be called in sequence, config your BusTerminal
with publisher
then inject it to Lihil
.
-
An event handler can have as many dependencies as you want, but it should at least contain two params: a sub type of
Event
, and a sub type ofMessageContext
. -
if a handler is reigstered with a parent event, it will listen to all of its sub event. for example,
-
a handler that listens to
UserEvent
, will also be called whenUserCreated(UserEvent)
,UserDeleted(UserEvent)
event is published/emitted. -
you can also publish event during event handling, to do so, declare one of your dependency as
EventBus
,
async def listen_create(created: TodoCreated, _: Any, bus: PEventBus):
if is_expired(created.created_at):
event = TodoExpired.from_event(created)
await bus.publish(event)